-
Notifications
You must be signed in to change notification settings - Fork 27
Support drop_table, partitions and offsets methods in python bindings #150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds two administrative operations to the Python bindings to achieve feature parity with C++ bindings: drop_table() for deleting tables and list_offsets() for querying bucket offsets. Both methods follow existing async patterns and leverage core APIs that already exist in the Rust codebase.
Changes:
- Added
Admin.drop_table()method with optionalignore_if_not_existsparameter - Added
Admin.list_offsets()method supporting earliest, latest, and timestamp-based offset queries - Introduced
OffsetTypeclass with string constants for type-safe offset type specification - Updated example.py to demonstrate both new features
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| bindings/python/src/lib.rs | Adds OffsetType class definition with string constants and registers it in the Python module |
| bindings/python/src/admin.rs | Implements drop_table() and list_offsets() admin methods with validation and error handling |
| bindings/python/example/example.py | Demonstrates usage of new methods with both string literals and OffsetType constants |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
359fbf3 to
753c060
Compare
753c060 to
ffd6c8d
Compare
|
resolved conflict |
ffd6c8d to
0b4b9c0
Compare
|
@luoyuxia PTAL 🙏 |
0b4b9c0 to
e6f0034
Compare
|
I'll add partition support, since I see we added this to cpp bindings |
e6f0034 to
32e6418
Compare
|
@luoyuxia I've reworked this PR to support partitioned tables and have the same methods as CPP bindings have atm. |
|
I think #246 should be merged first and then I'll rebase one more time. |
|
@fresh-borzoni Sorry for miss it. I'll review this weekend. |
|
@fresh-borzoni Could you please rebase it? |
32e6418 to
2931a6e
Compare
|
@luoyuxia rebased and renamed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let cache = self.partition_name_cache.read().unwrap(); | ||
| if let Some(map) = cache.as_ref() { | ||
| return Ok(map.clone()); | ||
| } | ||
| } | ||
|
|
||
| // Fetch partition infos (releases GIL during async call) | ||
| let partition_infos: Vec<fcore::metadata::PartitionInfo> = py | ||
| .detach(|| { | ||
| TOKIO_RUNTIME.block_on(async { self.admin.list_partition_infos(table_path).await }) | ||
| }) | ||
| .map_err(|e| FlussError::new_err(format!("Failed to list partition infos: {e}")))?; | ||
|
|
||
| // Build and cache the mapping | ||
| let map: HashMap<i64, String> = partition_infos | ||
| .into_iter() | ||
| .map(|info| (info.get_partition_id(), info.get_partition_name())) | ||
| .collect(); | ||
|
|
||
| // Store in cache (write lock) | ||
| { | ||
| let mut cache = self.partition_name_cache.write().unwrap(); |
Copilot
AI
Feb 5, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unwrap() calls on RwLock::read() and RwLock::write() will panic if the lock is poisoned (which happens when a thread panics while holding the lock). While poison is rare in Python bindings due to controlled execution, it's better to handle this gracefully. Consider using expect() with a descriptive message or mapping to a PyErr using map_err() instead of unwrap().
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fresh-borzoni Thanks for the pr. LGTM
Summary
Adds admin operations and partition support to achieve feature parity with Rust/C++ bindings.
Admin Operations
drop_table(table_path, ignore_if_not_exists=False)- Drop a table from the clusterlist_offsets(table_path, bucket_ids, offset_type, timestamp=None)- List offsets for non-partitioned tableslist_partition_offsets(table_path, partition_name, bucket_ids, offset_type, timestamp=None)- List offsets for partitioned tablescreate_partition(table_path, partition_spec, ignore_if_exists=False)- Create a partitionlist_partition_infos(table_path)- List all partitions for a tableLogScanner Low-Level API
Replaces high-level subscribe with low-level methods matching Rust/C++:
subscribe(bucket_id, start_offset)- Subscribe to a single bucketsubscribe_batch(bucket_offsets)- Subscribe to multiple bucketssubscribe_partition(partition_id, bucket_id, start_offset)- Subscribe to partitioned table bucketto_arrow()/to_pandas()now work for both partitioned and non-partitioned tablesCloses #148 #244